Exectors框架 源码分析
1. 在阅读源码时做了大量的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限,并且代码阅读起来没有 IDE 方便,所以在 github 上提供JDK1.8 的源码、详细的注释及测试用例。欢迎大家 star、fork !
2. 由于个人水平有限,对源码的分析理解可能存在偏差或不透彻的地方还请大家在评论区指出,谢谢!
1. 基本结构
由于 Exector 这个家族还是比较大的,所以先导出一下类图,对这个家族有一个大概的认识。
2. Exector
可以看到,Exector 属于一个接口,其实它里面只有一个 void execute(Runnable)
方法。注意它里面的参数是 Runnable
,并且返回值是 void 类型。
3. ExectorService
紧接着就是另外一个 ExectorService
接口,这个接口继承了上面的 Exector
但是他在原来的接口上添加了非常多的方法,其中有两类方法比较重要。 其中一类就是submit()
这里提供了三个重载的方法:
1 | // 返回一个 Future 对象,表示将要执行的线程的状态 |
注意,这三个方法都是有返回值的,并且返回值都是一个 Future 对象,然后就是参数的问题,他们的参数可以是 Runnable 也可以是 Callable 。Runnable 我们都比较熟悉了,Callable 我们就可以理解成带有返回值的 Runnable。
然后另外一类方法就是用来判断线程状态的方法,以及操作线程池中的线程的方法。
1 | // 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。 |
4. AbstractExectorService
虽然又是一个抽象类,但是他完全没有一个抽象方法,就像 AQS 一样。然后主要讨论这里面的几个重要方法。他增加了两个重载方法,然后实现了接口的三个 submit()
。
先说 submit()
方法,这个方法如果接受的是 Runnable
参数的话就直接调用了 newTaskFor(Runnable runnable, T value)
,在前面会看到对于 Runnable
是有两个重载方法的,那么这里就给 T
传参不同,可以是 null
。然后另外一个传参为 callable
的就调用 newTaskFor(Callable<T> callable)
。可以看到,他们底层都是调用了 execute
,最后返回的还是 RunnableFuture
。RunnableFuture
这个就是继承了 Runnable
和 Future
接口的一个空接口。
1 | // 调用的都是 newTaskFor 的 Runnable 参数方法 |
好,既然上面的方法都调用了 newTaskFor
,那么这个方法就是 submit
的执行基础,接着看一下这个方法。
1 | // 这两个是 AES 的核心方法,主要就是创建任务 是 submit 的依赖 |
可以看到上面的方法全都是采用了 FutureTask
创建的新任务。返回后交给 execute
方法执行。那很明显, FutureTask
肯定是 RunnableFuture
的子类,确实 FutureTask
只实现了 RunnableFuture
这一个接口。
5. FutureTask 介绍
在前面看到,这个类实现了 RunnableFuture 然后 RunnableFuture 又继承了 Runnable 和 Future ,这个名字取的还真是可以,直接把两个接口组合了。Runnable 我们知道他就只有一个 Run 方法,用来放任务代码。而 Future 又是干嘛的呢。我们来看一下里面的方法,比较少。
1 | // 取消任务 |
好的,其实就是在原来的任务的基础上加上了关于状态判断和结果获取的方法,主要是结果的阻塞获取是非常重要的,这也是 Runnable 的一大缺陷。
FutureTask 其实就是 RunnableFuture 的一个实现类。所以说我们大致的看一下这个实现类就好。
1 | // 任务状态 |
里面的一些其他的方法就是用来对任务进行操作的,主要就是实现了父类的一些方法。里面也采用了 CAS 对线程的设定等待。
6. ThreadPoolExector
然后介绍完了 AES 以后,就是真正的可用的类了,这里我们首先介绍的是 ThreadPoolExector
也就是我们常说的线程池。但是我们很少看到看到我们直接的去 new 一个 ThreadPoolExector
而更常用的是采用 Executors
这个类去创建线程池。主要就是 Executors
算是一个工厂方法,new 一个线程池的过程是比较麻烦的,需要配置很多的参数,这个工厂方法把一些我们会经常用到的的线程池都通过一个工厂方法返回给我们,减少我们的工作量。
这个类其实也是比较复杂的,他的代码比较多,这里只分析一部分比较重要的参数和方法。
1. 字段
1. 基本字段
1 | // 任务的后备队列 |
其中比较重要的属性有: workQueue(任务队列)、一个 Set
其中池控参数是将两个只是变量都打包进去了,分别是 workerCount 和 runState 。
workerCount有效线程数.runState表明线程池的状态是否为运行,还是关闭。为了方便表示我们把 workerCount 和 runState 打包到了一个变量里面就是 ctl。
runState 表示生命周期,有以下状态:
RUNNING:接受新任务并处理排队的任务
SHUTDOWN:不接受新任务,但处理排队的任务
STOP:不接受新任务,不处理排队的任务,并中断正在进行的任务
TIDYING:所有任务都已终止,workerCount为零,线程转换到状态TIDYING 将运行terminate() 勾子
TERMINATED:terminated()已完成
并且这些值之间顺序很重要,以允许有序的比较。 runState在整个过程中是单调递增的但不需要经过每一个状态,具体规律如下:
- RUNNING -> SHUTDOWN 在执行 shutdown()的时候
- (RUNNING or SHUTDOWN) -> STOP 在执行shutdownNow()
- SHUTDOWN -> TIDYING 当任务队列和线程池为空的时候
- STOP -> TIDYING 当池为空的时候
- TIDYING -> TERMINATED 钩子方法调用完毕
2. Worker
这个类其实比较有意思,一般我们会想到它里面必然是有一个线程的引用的,也就是把线程用 Worker 类来包装一下。然后还有一个就是他当前正在执行的任务引用。
但是我们看这个类的继承及实现的时候就会发现比较有意思的事情,他继承了 AQS 实现了 Runnale 也就是他此时从结构来看既是一个 同步器
还是一个 任务
。
1. 字段
1 | // 线程 |
2. 方法
里面比较重要的就是一些锁的实现,这里采用的全是互斥锁。然后实现的 Runnable 的 run 就是执行当前的任务。待会看运用这些方法再具体分析,里面的代码不多。
3. BlockQueue
这里采用了 BlockQueue 来存放任务,然后 BlockQueue 其实是一个接口,他有很多子类,而我们使用 Exectors 工厂方法 new 出新的线程池的时候其实不同种类的线程池采用的也是不同的 BlockQueue 。
在 doc 中提到的子类有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 但是我们真正常用的就是 :ArrayBlockingQueue,LinkedBlockingQueue, SynchronousQueue 而我们在 Exectors 中就用了 LinkedBlockingQueue, SynchronousQueue 这两种。这个到时候会专门写一篇关于阻塞队列的博客,不然这个文章的篇幅太大了。
4. ThreadFactory
好很明显,这是一个函数式接口,里面就只有一个 newThread 方法,在线程池中,如果我们没有自己传入的话采用的都是 Executors.defaultThreadFactory()
创建一个 非 Daemon 优先级为 NORM_PRIORITY 的线程。
这样主线程退出时不会直接退出JVM,而是等待线程池中的线程结束。所有线程都调为同一个级别,这样在操作系统角度来看所有系统都是公平的,不会导致竞争堆积。
2. 主要方法
1. ctor
1 | public ThreadPoolExecutor(int corePoolSize, //核心线程容量 |
2. execute
这个方法是把提交到线程池中的任务在将来的某个时间运行起来,但是不一定能够保证这个任务肯定能执行到,为什么这么说?这是由于我们在前面看到提交到线程池中的线程不一定被马上执行,如果说线程池中线程都在忙现有的任务,新提交的就会被放到任务队列中 (BlockQueue),然而我们在上面还看到了一个拒绝策略,就是当线程池饱和或者线程池马上关闭了提交的任务会被拒绝,在这里就是抛出异常!
好,现在来理一下 execute
方法的执行过程。
如果少于corePoolSize线程正在运行,开始一个新的线程领取任务。 对addWorker的调用会自动检查runState和workerCount,从而防止可能会添加的错误警报线程时它不应该通过返回false。
把任务放入后备队列,如果成功我们还是需要再次检查,因为自上次检查依赖我们可能遇到线程池关闭
如果有必要的话需要回退任务队列如果排队失败,我们会尝试添加一个新的线程。而线程添加失败则说明线程池关闭了或者
已经处于饱和状态
1 | public void execute(Runnable command) { |
3. addWorker
这个方法的功能是:根据当前线程池的状态和给的边界条件来检测是否需要添加新的线程,如果是,则添加到线程队列中并调整工作线程数并启动线程执行第一个任务。 如果该方法检测到线程池处于STOP状态或者是察觉到将要停止,则返回false。 如果线程工厂创建线程失败(可能是由于发生了OOM异常)则也返回false。
今天有点累 ,先写到这里!明天继续,清明小长假还要继续学习 :(